
/*
 * Copyright © 2021 https://www.cestc.cn/ All rights reserved.
 */

package com.zx.learn.flink.watermark;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class WatermarkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //
        env.setParallelism(1);
        //2.Source
        //模拟实时订单数据(数据有延迟和乱序)
        DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模拟数据延迟和乱序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });

        // 添加水印机制 最大允许延迟的时间为 3 秒
        //orderDS.printToErr();
        //分配水印机制
        SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                        //指定最大的延迟时间
                        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp);
                        .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
                //统计每个用户对应 购买 金额
                .keyBy(t -> t.getUserId())
                //指定窗口，每5秒钟统计5秒钟之内的数据
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");
        sum.print();

        env.execute();
    }
}
